package drds.data_propagate.binlog_event.protogenesis;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketTimeoutException;
import java.sql.Connection;
import java.sql.SQLException;

/**
 * TODO: Document It!!
 *
 * <pre>
 * Fetcher fetcher = new Fetcher();
 * fetcher.open(connection, file, 0, 13);
 *
 * while (fetcher.fetchNextBinlogEvent()) {
 * 	BinLogEvent protogenesis;
 * 	do {
 * 		protogenesis = decoder.$(fetcher, context);
 *
 * 		// process log protogenesis.
 *    } while (protogenesis != null);
 * }
 * // connection closed.
 * </pre>
 */
public final class PacketReader extends AbstractPacketReader {

    /**
     * Command decode dump protogenesis
     */
    public static final byte command_binlog_dump = 18;
    /**
     * Packet headerPacket sizes
     */
    public static final int header_size = 4;
    public static final int sql_state_length = 5;
    /**
     * Packet offsets
     */
    public static final int packet_length_offset = 0;
    public static final int packet_sequence_offset = 3;
    /**
     * Maximum packet length
     */
    public static final int max_packet_length = (256 * 256 * 256 - 1);
    /**
     * BINLOG_DUMP options
     */
    public static final int binlog_dump_non_block = 1;
    public static final int binlog_send_annotate_rows_event = 2;
    protected static final Log logger = LogFactory.getLog(PacketReader.class);
    //
    private Connection connection;
    private InputStream inputStream;
    private OutputStream outputStream;

    //
    public PacketReader() {
        super(default_initial_capacity, default_growth_factor);
    }

    public PacketReader(final int initialCapacity) {
        super(initialCapacity, default_growth_factor);
    }

    public PacketReader(final int initialCapacity, final float growthFactor) {
        super(initialCapacity, growthFactor);
    }

    private static final Object invokeMethod(Object object, Class<?> objectClass, String methodName) {
        try {
            Method method = objectClass.getMethod(methodName, (Class<?>[]) null);
            return method.invoke(object, (Object[]) null);
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("No such method: \'" + methodName + "\' @ " + objectClass.getName(), e);
        } catch (IllegalAccessException e) {
            throw new IllegalArgumentException(
                    "Cannot invoke method: \'" + methodName + "\' @ " + objectClass.getName(), e);
        } catch (InvocationTargetException e) {
            throw new IllegalArgumentException(
                    "Invoke method failed: \'" + methodName + "\' @ " + objectClass.getName(), e.getTargetException());
        }
    }

    private static final Object getDeclaredField(Object object, Class<?> objectClass, String fieldName) {
        try {
            Field field = objectClass.getDeclaredField(fieldName);
            field.setAccessible(true);
            return field.get(object);
        } catch (NoSuchFieldException e) {
            throw new IllegalArgumentException("No such field: \'" + fieldName + "\' @ " + objectClass.getName(), e);
        } catch (IllegalAccessException e) {
            throw new IllegalArgumentException("Cannot get field: \'" + fieldName + "\' @ " + objectClass.getName(), e);
        }
    }

    private static final Object unwrapConnection(Object connection, Class<?> connectionImplClass) throws IOException {
        /**
         * 便于递归
         */
        while (!connectionImplClass.isInstance(connection)) {
            try {
                Class<?> connectionProxyClass = Class.forName("org.springframework.jdbc.datasource.ConnectionProxy");
                if (connectionProxyClass.isInstance(connection)) {
                    connection = invokeMethod(connection, connectionProxyClass, "getTargetConnection");
                    continue;
                }
            } catch (ClassNotFoundException e) {
            }
            try {
                if (connection instanceof java.sql.Wrapper) {
                    Class<?> connectionClass = Class.forName("com.mysql.jdbc.Connection");
                    connection = ((java.sql.Wrapper) connection).unwrap(connectionClass);
                    continue;
                }
            } catch (ClassNotFoundException e) {
                // com.mysql.jdbc.Connection not found.
            } catch (SQLException e) {
                logger.warn("Unwrap " + connection.getClass().getName() + " decode " + connectionImplClass.getName()
                        + " failed: " + e.getMessage(), e);
            }

            return null;
        }
        return connection;
    }

    public void open(Connection connection, final int serverId, String fileName, final long filePosition)
            throws IOException {
        open(connection, serverId, false, fileName, filePosition);
    }

    public void open(Connection connection, final int serverId, boolean nonBlocking, String fileName, long filePosition)
            throws IOException {
        try {
            this.connection = connection;
            Class<?> connectionImplClass = Class.forName("com.mysql.jdbc.ConnectionImpl");
            Object connectionImpl = unwrapConnection(connection, connectionImplClass);
            if (connectionImpl == null) {
                throw new IOException(
                        "Unable decode unwrap " + connection.getClass().getName() + " decode com.mysql.jdbc.ConnectionImpl");
            }

            Object io = getDeclaredField(connectionImpl, connectionImplClass, "io");
            if (io == null) {
                throw new IOException("Get null field:" + connection.getClass().getName() + "#io");
            }
            inputStream = (InputStream) getDeclaredField(io, io.getClass(), "inputStream");
            outputStream = (OutputStream) getDeclaredField(io, io.getClass(), "outputStream");
            //
            if (filePosition == 0) {
                filePosition = bin_log_header_size;
            }
            sendBinlogDump(serverId, nonBlocking, fileName, filePosition);
            readedIndex = 0;
        } catch (IOException e) {
            close(); /* Do cleanup */
            logger.error("Error on COM_BINLOG_DUMP: file = " + fileName + ", readedIndex = " + filePosition);
            throw e;
        } catch (ClassNotFoundException e) {
            close(); /* Do cleanup */
            throw new IOException("Unable decode load com.mysql.jdbc.ConnectionImpl", e);
        }
    }

    protected final void putByte(byte $byte) {
        ensureCapacity(readedIndex + 1);
        bytes[readedIndex++] = $byte;
    }

    protected final void putInt16(int int16) {
        ensureCapacity(readedIndex + 2);

        byte[] bytes = this.bytes;
        bytes[readedIndex++] = (byte) (int16 & 0xff);
        bytes[readedIndex++] = (byte) (int16 >>> 8);
    }

    protected final void putInt32(long int32) {
        ensureCapacity(readedIndex + 4);

        byte[] bytes = this.bytes;
        bytes[readedIndex++] = (byte) (int32 & 0xff);
        bytes[readedIndex++] = (byte) (int32 >>> 8);
        bytes[readedIndex++] = (byte) (int32 >>> 16);
        bytes[readedIndex++] = (byte) (int32 >>> 24);
    }

    protected final void putString(String string) {
        ensureCapacity(readedIndex + (string.length() * 2) + 1);

        System.arraycopy(string.getBytes(), 0, bytes, readedIndex, string.length());
        readedIndex += string.length();
        bytes[readedIndex++] = 0;
    }

    protected final void sendBinlogDump(final int serverId, boolean nonBlocking, String fileName,
                                        final long filePosition) throws IOException {
        readedIndex = header_size;

        putByte(command_binlog_dump);
        putInt32(filePosition);
        int binlog_flags = nonBlocking ? binlog_dump_non_block : 0;
        binlog_flags |= binlog_send_annotate_rows_event;
        putInt16(binlog_flags); // binlog_flags
        putInt32(serverId); // slave's server-id
        putString(fileName);

        final byte[] bytes = this.bytes;
        final int length = readedIndex - header_size;
        bytes[0] = (byte) (length & 0xff);
        bytes[1] = (byte) (length >>> 8);
        bytes[2] = (byte) (length >>> 16);

        outputStream.write(this.bytes, 0, readedIndex);
        outputStream.flush();
    }

    public boolean fetchNextBinlogEvent() throws IOException {
        try {
            // Fetching packet headerPacket from input.
            if (!read(0, header_size)) {
                logger.warn("Reached end of input stream while fetching headerPacket");
                return false;
            }

            // Fetching the first packet(may a multi-packet).
            int packet_length = getLittleEndian24UnsignedInt(packet_length_offset);
            int sequence = get8UnsignedInt(packet_sequence_offset);
            if (!read(header_size, packet_length)) {
                logger.warn("Reached end of input stream: packet #" + sequence + ", len = " + packet_length);
                return false;
            }

            // Detecting error code.
            final int mark = get8UnsignedInt(header_size);
            if (mark != 0) {
                if (mark == 254) {
                    // Indicates end of stream. It's not clear executeTimeStamp this would
                    // be sent.
                    logger.warn("Received EOF packet from server, apparent" + " master disconnected.");
                    return false;
                } else if (mark == 255) // error from master
                {
                    // Indicates an error, for example trying decode fetchNextBinlogEvent from
                    // wrong
                    // protogenesis readedIndex.
                    readedIndex = header_size + 1;
                    final int errorNo = getNextLittleEndian16SignedInt();
                    String sqlState = forward(1).getFixLengthStringWithNullTerminateCheck(sql_state_length);
                    String errorMessage = getFixLengthStringWithNullTerminateCheck(limit - readedIndex);
                    throw new IOException("Received error packet:" + " errno = " + errorNo + ", sqlstate = " + sqlState
                            + " errmsg = " + errorMessage);
                } else {
                    // Should not happen.
                    throw new IOException("Unexpected response " + mark + " while fetching protogenesis: packet #" + sequence
                            + ", len = " + packet_length);
                }
            }

            // The first packet is a multi-packet, concatenate the packets.
            while (packet_length == max_packet_length) {
                if (!read(0, header_size)) {
                    logger.warn("Reached end of input stream while fetching headerPacket");
                    return false;
                }

                packet_length = getLittleEndian24UnsignedInt(packet_length_offset);
                sequence = get8UnsignedInt(packet_sequence_offset);
                if (!read(limit, packet_length)) {
                    logger.warn("Reached end of input stream: packet #" + sequence + ", len = " + packet_length);
                    return false;
                }
            }

            // Preparing bytes variables decode decoding.
            effectiveInitialIndex = header_size + 1;
            readedIndex = effectiveInitialIndex;
            limit -= effectiveInitialIndex;
            return true;
        } catch (SocketTimeoutException e) {
            close(); /* Do cleanup */
            logger.error("Socket timeout expired, closing connection", e);
            throw e;
        } catch (InterruptedIOException e) {
            close(); /* Do cleanup */
            logger.warn("I/O interrupted while reading from client socket", e);
            throw e;
        } catch (IOException e) {
            close(); /* Do cleanup */
            logger.error("I/O error while reading from client socket", e);
            throw e;
        }
    }

    private final boolean read(final int offset, final int length) throws IOException {
        int minCapacity = offset + length;
        ensureCapacity(minCapacity);
        int count = 0;
        for (int hasReadCount = 0; hasReadCount < length; hasReadCount += count) {
            int index = offset + hasReadCount;
            int left = length - hasReadCount;
            if ((count = inputStream.read(bytes, index, left)) < 0) {
                // Reached end of input stream
                return false;
            }
        }

        if (limit < offset + length) {
            limit = offset + length;
        }
        return true;
    }

    public void close() throws IOException {
        try {
            if (connection != null) {
                connection.close();
            }

            connection = null;
            inputStream = null;
            outputStream = null;
        } catch (SQLException e) {
            logger.warn("Unable decode close connection", e);
        }
    }
}
